Skip to content

perf: skip concurrency overhead in BaseSingleBlockCombineOperator when numTasks=1#18146

Open
deeppatel710 wants to merge 4 commits intoapache:masterfrom
deeppatel710:perf/single-thread-combine-operator
Open

perf: skip concurrency overhead in BaseSingleBlockCombineOperator when numTasks=1#18146
deeppatel710 wants to merge 4 commits intoapache:masterfrom
deeppatel710:perf/single-thread-combine-operator

Conversation

@deeppatel710
Copy link
Copy Markdown
Contributor

Summary

Fixes #14617

When a query runs with a single execution task (one segment, or maxExecutionThreads=1), BaseSingleBlockCombineOperator still incurred the full multi-thread overhead:

  • ExecutorService.submit() — thread pool task submission
  • Phaser — register/deregister synchronization
  • BlockingQueue.poll() — with timeout waiting
  • AtomicInteger / AtomicReference — volatile reads/writes on the hot path

None of this is necessary when _numTasks == 1.

Change

Added a getNextBlockSingleThread() fast path in BaseSingleBlockCombineOperator.getNextBlock(): when _numTasks == 1 and _resultsBlockMerger is non-null, all segments are processed sequentially on the
calling thread with no synchronization overhead. CPU time and memory allocation are still tracked via ThreadResourceSnapshot.

Subclasses that override mergeResults() with custom logic (e.g. SequentialSortedGroupByCombineOperator, which passes null for _resultsBlockMerger) are unaffected and fall through to the standard
multi-thread path.

Test plan

  • All existing combine operator tests pass (125 tests)
  • CombineSlowOperatorsTest, CombineErrorOperatorsTest, SelectionCombineOperatorTest, SortedGroupByCombineOperatorsTest, CombinePlanNodeTest — all green

…n numTasks=1

When a query runs with a single execution task (e.g. one segment or
maxExecutionThreads=1), BaseSingleBlockCombineOperator still incurred the
full multi-thread overhead: ExecutorService.submit(), Phaser registration/
deregistration, BlockingQueue.poll() with timeout, AtomicInteger, and
AtomicReference.

This adds a single-thread fast path in getNextBlock(): when _numTasks==1
and _resultsBlockMerger is non-null (i.e. the subclass uses the default
merge strategy), segments are processed sequentially on the calling thread
with none of that synchronization overhead. CPU time and memory are still
tracked via ThreadResourceSnapshot.

Subclasses that override mergeResults() with custom logic (e.g.
SequentialSortedGroupByCombineOperator, which passes null for
_resultsBlockMerger) are unaffected and continue using the standard path.

Fixes apache#14617

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@xiangfu0 xiangfu0 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a critical correctness issue with the fast path implementation:

Timeout Handling Missing: The new getNextBlockSingleThread() method lacks timeout protection that exists in the original mergeResults() method. The original method checks _queryContext.getEndTimeMs() and returns a timeout results block if the deadline is exceeded. The fast path has no such protection, which means:

  1. A hanging segment operator could block indefinitely
  2. No respect for query timeout deadline
  3. Potential resource exhaustion if a segment operator stalls

The original mergeResults() path handles this via the _blockingQueue.poll(waitTimeMs, TimeUnit.MILLISECONDS) with explicit timeout checking. The fast path should implement similar timeout protection.

}
}

/// Processes all segments sequentially on the calling thread when only one task is needed.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing timeout protection. This fast path should respect _queryContext.getEndTimeMs() like the original mergeResults() method does. A hanging segment operator could block indefinitely here without timeout checking.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Added a System.currentTimeMillis() >= endTimeMs check at the top of each loop iteration before invoking operator.nextBlock(). If the deadline is exceeded, getTimeoutResultsBlock(i) is returned
immediately — exactly mirroring the waitTimeMs <= 0 guard in mergeResults(). A stalled segment operator will now be bypassed at the next iteration boundary rather than blocking indefinitely.

…ombineOperator

The getNextBlockSingleThread() fast path was missing the timeout protection
present in the original mergeResults() method. A stalled segment operator
could block indefinitely with no deadline enforcement.

Fix: check System.currentTimeMillis() >= endTimeMs before invoking each
segment operator. If the deadline is exceeded, return a timeout results block
immediately. This mirrors the waitTimeMs <= 0 guard in mergeResults().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@codecov-commenter
Copy link
Copy Markdown

codecov-commenter commented Apr 12, 2026

Codecov Report

❌ Patch coverage is 48.00000% with 13 lines in your changes missing coverage. Please review.
✅ Project coverage is 63.39%. Comparing base (2e80bff) to head (ce8cec5).
⚠️ Report is 23 commits behind head on master.

Files with missing lines Patch % Lines
...erator/combine/BaseSingleBlockCombineOperator.java 48.00% 7 Missing and 6 partials ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18146      +/-   ##
============================================
+ Coverage     63.04%   63.39%   +0.34%     
- Complexity     1617     1627      +10     
============================================
  Files          3202     3229      +27     
  Lines        194718   196730    +2012     
  Branches      30047    30415     +368     
============================================
+ Hits         122760   124714    +1954     
+ Misses        62233    62016     -217     
- Partials       9725    10000     +275     
Flag Coverage Δ
custom-integration1 100.00% <ø> (ø)
integration 100.00% <ø> (ø)
integration1 100.00% <ø> (ø)
integration2 0.00% <ø> (ø)
java-11 63.36% <48.00%> (+0.35%) ⬆️
java-21 63.28% <48.00%> (+0.26%) ⬆️
temurin 63.39% <48.00%> (+0.34%) ⬆️
unittests 63.39% <48.00%> (+0.34%) ⬆️
unittests1 55.28% <48.00%> (-0.29%) ⬇️
unittests2 35.03% <32.00%> (+1.59%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Deep Patel and others added 2 commits April 12, 2026 15:04
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…hread fast path

Two bugs in getNextBlockSingleThread():

1. Resource stats double-counting:
   The method accumulated _totalWorkerThreadCpuTimeNs on the calling thread,
   but InstanceResponseOperator.getBaseBlock() already captures the same
   thread's CPU time as mainThreadCpuTimeNs. The calSystemActivitiesCpuTimeNs
   formula (wallClock - mainThread - workerThread/N) then subtracted the same
   work twice, producing a negative value clamped to 0. This broke
   testResourceUsageStats in the CPU/memory query killing integration tests.
   Fix: remove the ThreadResourceSnapshot tracking; the main thread's snapshot
   already accounts for all work done in the single-thread path.

2. Exception handling for operator failures:
   wrapOperatorException() throws (not returns) a new QueryException for most
   RuntimeExceptions. Calling it as an argument to createExceptionResultsBlock-
   AndAttachExecutionStats caused the thrown exception to escape the catch block
   uncaught, propagating to callers instead of being wrapped in an error block.
   This broke JsonExtractScalarTransformFunctionTest.mvWithNullsWithoutDefault.
   Fix: wrap in try/throw/catch to capture whatever wrapOperatorException
   throws or returns, then convert to an error block — mirroring the multi-
   thread path's onProcessSegmentsException handler.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add special combine operator for single thread case

3 participants